// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package record reads and writes sequences of records. Each record is a stream
// of bytes that completes before the next record starts.
//
// When reading, call Next to obtain an io.Reader for the next record. Next will
// return io.EOF when there are no more records. It is valid to call Next
// without reading the current record to exhaustion.
//
// When writing, call Next to obtain an io.Writer for the next record. Calling
// Next finishes the current record. Call Close to finish the final record.
//
// Optionally, call Flush to finish the current record and flush the underlying
// writer without starting a new record. To start a new record after flushing,
// call Next.
//
// Neither Readers or Writers are safe to use concurrently.
//
// Example code:
//    func read(r io.Reader) ([]string, error) {
//        var ss []string
//        records := record.NewReader(r)
//        for {
//            rec, err := records.Next()
//            if err == io.EOF {
//                break
//            }
//            if err != nil {
//                return nil, err
//            }
//            s, err := ioutil.ReadAll(rec)
//            if err != nil {
//                return nil, err
//            }
//            ss = append(ss, string(s))
//        }
//        return ss, nil
//    }
//
//    func write(w io.Writer, ss []string) error {
//        records := record.NewWriter(w)
//        for _, s := range ss {
//            rec, err := records.Next()
//            if err != nil {
//                return err
//            }
//            if _, err := rec.Write([]byte(s)), err != nil {
//                return err
//            }
//        }
//        return records.Close()
//    }
//
// The wire format is that the stream is divided into 32KiB blocks, and each
// block contains a number of tightly packed chunks. Chunks cannot cross block
// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
// block must be zero.
//
// A record maps to one or more chunks. Each chunk has a 7 byte header (a 4
// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
// followed by a payload. The checksum is over the chunk type and the payload.
//
// There are four chunk types: whether the chunk is the full record, or the
// first, middle or last chunk of a multi-chunk record. A multi-chunk record
// has one first chunk, zero or more middle chunks, and one last chunk.
//
// The wire format allows for limited recovery in the face of data corruption:
// on a format error (such as a checksum mismatch), the reader moves to the
// next block and looks for the next full or first chunk.
package record

// TODO: implement the recovery algorithm.

// The C++ Level-DB code calls this the log, but it has been renamed to record
// to avoid clashing with the standard log package, and because it is generally
// useful outside of logging. The C++ code also uses the term "physical record"
// instead of "chunk", but "chunk" is shorter and less confusing.

import (
    "encoding/binary"
    "errors"
    "io"

    "code.google.com/p/leveldb-go/leveldb/crc"
)

// These constants are part of the wire format and should not be changed.
const (
    fullChunkType   = 1
    firstChunkType  = 2
    middleChunkType = 3
    lastChunkType   = 4
)

const (
    blockSize  = 32 * 1024
    headerSize = 7
)

type flusher interface {
    Flush() error
}

// Reader reads records from an underlying io.Reader.
type Reader struct {
    // r is the underlying reader.
    r io.Reader
    // seq is the sequence number of the current record.
    seq int
    // buf[i:j] is the unread portion of the current chunk's payload.
    // The low bound, i, excludes the chunk header.
    i, j int
    // n is the number of bytes of buf that are valid. Once reading has started,
    // only the final block can have n < blockSize.
    n int
    // started is whether Next has been called at all.
    started bool
    // last is whether the current chunk is the last chunk of the record.
    last bool
    // err is any accumulated error.
    err error
    // buf is the buffer.
    buf [blockSize]byte
}

// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
    return &Reader{
        r: r,
    }
}

// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
// next block into the buffer if necessary.
func (r *Reader) nextChunk(wantFirst bool) error {
    for {
        if r.j+headerSize <= r.n {
            checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
            length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
            chunkType := r.buf[r.j+6]

            if checksum == 0 && length == 0 && chunkType == 0 {
                if wantFirst {
                    // Skip the rest of the block, if it looks like it is all zeroes.
                    // This is common if the record file was created via mmap.
                    r.i = r.n
                    r.j = r.n
                    continue
                } else {
                    return errors.New("leveldb/record: invalid chunk")
                }
            }

            r.i = r.j + headerSize
            r.j = r.j + headerSize + int(length)
            if r.j > r.n {
                return errors.New("leveldb/record: invalid chunk (length overflows block)")
            }
            if checksum != crc.New(r.buf[r.i-1:r.j]).Value() {
                return errors.New("leveldb/record: invalid chunk (checksum mismatch)")
            }
            if wantFirst {
                if chunkType != fullChunkType && chunkType != firstChunkType {
                    continue
                }
            }
            r.last = chunkType == fullChunkType || chunkType == lastChunkType
            return nil
        }
        if r.n < blockSize && r.started {
            if r.j != r.n {
                return io.ErrUnexpectedEOF
            }
            return io.EOF
        }
        n, err := io.ReadFull(r.r, r.buf[:])
        if err != nil && err != io.ErrUnexpectedEOF {
            return err
        }
        r.i, r.j, r.n = 0, 0, n
    }
    panic("unreachable")
}

// Next returns a reader for the next record. It returns io.EOF if there are no
// more records. The reader returned becomes stale after the next Next call,
// and should no longer be used.
func (r *Reader) Next() (io.Reader, error) {
    r.seq++
    if r.err != nil {
        return nil, r.err
    }
    r.i = r.j
    r.err = r.nextChunk(true)
    if r.err != nil {
        return nil, r.err
    }
    r.started = true
    return singleReader{r, r.seq}, nil
}

type singleReader struct {
    r   *Reader
    seq int
}

func (x singleReader) Read(p []byte) (int, error) {
    r := x.r
    if r.seq != x.seq {
        return 0, errors.New("leveldb/record: stale reader")
    }
    if r.err != nil {
        return 0, r.err
    }
    for r.i == r.j {
        if r.last {
            return 0, io.EOF
        }
        if r.err = r.nextChunk(false); r.err != nil {
            return 0, r.err
        }
    }
    n := copy(p, r.buf[r.i:r.j])
    r.i += n
    return n, nil
}

// Writer writes records to an underlying io.Writer.
type Writer struct {
    // w is the underlying writer.
    w io.Writer
    // seq is the sequence number of the current record.
    seq int
    // f is w as a flusher.
    f flusher
    // buf[i:j] is the bytes that will become the current chunk.
    // The low bound, i, includes the chunk header.
    i, j int
    // buf[:written] has already been written to w.
    // written is zero unless Flush has been called.
    written int
    // first is whether the current chunk is the first chunk of the record.
    first bool
    // pending is whether a chunk is buffered but not yet written.
    pending bool
    // err is any accumulated error.
    err error
    // buf is the buffer.
    buf [blockSize]byte
}

// NewWriter returns a new Writer.
func NewWriter(w io.Writer) *Writer {
    f, _ := w.(flusher)
    return &Writer{
        w: w,
        f: f,
    }
}

// fillHeader fills in the header for the pending chunk.
func (w *Writer) fillHeader(last bool) {
    if w.i+headerSize > w.j || w.j > blockSize {
        panic("leveldb/record: bad writer state")
    }
    if last {
        if w.first {
            w.buf[w.i+6] = fullChunkType
        } else {
            w.buf[w.i+6] = lastChunkType
        }
    } else {
        if w.first {
            w.buf[w.i+6] = firstChunkType
        } else {
            w.buf[w.i+6] = middleChunkType
        }
    }
    binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
    binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
}

// writeBlock writes the buffered block to the underlying writer, and reserves
// space for the next chunk's header.
func (w *Writer) writeBlock() {
    _, w.err = w.w.Write(w.buf[w.written:])
    w.i = 0
    w.j = headerSize
    w.written = 0
}

// writePending finishes the current record and writes the buffer to the
// underlying writer.
func (w *Writer) writePending() {
    if w.err != nil {
        return
    }
    if w.pending {
        w.fillHeader(true)
        w.pending = false
    }
    _, w.err = w.w.Write(w.buf[w.written:w.j])
    w.written = w.j
}

// Close finishes the current record and closes the writer.
func (w *Writer) Close() error {
    w.seq++
    w.writePending()
    if w.err != nil {
        return w.err
    }
    w.err = errors.New("leveldb/record: closed Writer")
    return nil
}

// Flush finishes the current record, writes to the underlying writer, and
// flushes it if that writer implements interface{ Flush() error }.
func (w *Writer) Flush() error {
    w.seq++
    w.writePending()
    if w.err != nil {
        return w.err
    }
    if w.f != nil {
        w.err = w.f.Flush()
        return w.err
    }
    return nil
}

// Next returns a writer for the next record. The writer returned becomes stale
// after the next Close, Flush or Next call, and should no longer be used.
func (w *Writer) Next() (io.Writer, error) {
    w.seq++
    if w.err != nil {
        return nil, w.err
    }
    if w.pending {
        w.fillHeader(true)
    }
    w.i = w.j
    w.j = w.j + headerSize
    // Check if there is room in the block for the header.
    if w.j > blockSize {
        // Fill in the rest of the block with zeroes.
        for k := w.i; k < blockSize; k++ {
            w.buf[k] = 0
        }
        w.writeBlock()
        if w.err != nil {
            return nil, w.err
        }
    }
    w.first = true
    w.pending = true
    return singleWriter{w, w.seq}, nil
}

type singleWriter struct {
    w   *Writer
    seq int
}

func (x singleWriter) Write(p []byte) (int, error) {
    w := x.w
    if w.seq != x.seq {
        return 0, errors.New("leveldb/record: stale writer")
    }
    if w.err != nil {
        return 0, w.err
    }
    n0 := len(p)
    for len(p) > 0 {
        // Write a block, if it is full.
        if w.j == blockSize {
            w.fillHeader(false)
            w.writeBlock()
            if w.err != nil {
                return 0, w.err
            }
            w.first = false
        }
        // Copy bytes into the buffer.
        n := copy(w.buf[w.j:], p)
        w.j += n
        p = p[n:]
    }
    return n0, nil
}
